Solutions/Trend Micro Vision One/Data Connectors/AzureFunctionTrendMicroXDR/shared_code/services/oat_service.py (284 lines of code) (raw):

""" Azure Table SDK Currently using azure-cosmosdb-table, see: https://pypi.org/project/azure-cosmosdb-table/ TODO: Use azure-data-tables instead, see: https://pypi.org/project/azure-data-tables/ E.g. create_table_if_not_exists only supported by azure-data-tables """ import gzip import uuid from datetime import datetime from io import BytesIO from typing import Any, Dict, List, Optional, Tuple import orjson import requests from azure.core.exceptions import ResourceNotFoundError from azure.data.tables import TableClient, TableServiceClient, UpdateMode from shared_code import configurations from shared_code.customized_logger.customized_json_logger import ( get_customized_json_logger, ) from shared_code.data_collector import LogAnalytics from shared_code.trace_utils.trace import trace_manager from shared_code.transform_utils import transform_oat_log logger = get_customized_json_logger() XDR_HOST_URL = configurations.get_xdr_host_url() OAT_ROWS_BULK_COUNT = configurations.get_oat_rows_bulk_count() WORKSPACE_ID = configurations.get_workspace_id() WORKSPACE_KEY = configurations.get_workspace_key() OAT_LOG_TYPE = configurations.get_oat_log_type() STORAGE_CONNECTION_STRING = configurations.get_storage_connection_string() OAT_PIPELINE_PROCESSED_INFO_TABLE = "OatPipelineFileProcessedInfo" REGISTER_STATUS_TABLE = "XdrOatRegisterStatus" def get_header(headers=None): headers = headers or {} trace_manager.trace_id = headers["x-trace-id"] = str(uuid.uuid4()) if trace_manager.task_id: headers["x-task-id"] = trace_manager.task_id headers["User-Agent"] = configurations.get_user_agent() return headers def get_trace_log(headers): log = "headers is empty." if headers: task_id = headers.get("x-task-id") trace_id = headers.get("x-trace-id") log = f"task id: {task_id}, trace id: {trace_id}." return log def update_oat_pipeline_config(token: str, patch_data: Dict[str, Any]) -> None: url = f"{XDR_HOST_URL}/beta/xdr/oat/dataPipeline" headers = get_header( { "Authorization": f"Bearer {token}", "Content-Type": "application/json", } ) logger.info(f"Update oat pipeline config api: {url} \ndata: {patch_data}") response = requests.patch(url, headers=headers, json=patch_data) logger.info( f"Update oat config response: {response.text}, " f"response headers: {get_trace_log(response.headers)}" ) response.raise_for_status() def get_oat_package_list( token: str, start_time: str, end_time: str, pipeline_id: str ) -> Tuple[int, List[Dict[str, str]]]: """Get OAT package list Args: token (str): account token start_time (str): The beginning of the time interval end_time (str): The end of the time interval pipeline_id (str): Unique alphanumeric string that identifies a data pipeline. Returns: Tuple[int, List[Dict[str, str]]]: Number of items retrieved and package list """ query_params = { "startDateTime": start_time, "endDateTime": end_time, } url = f"{XDR_HOST_URL}/v1.0/preview/ath/oat/dataPipelines/{pipeline_id}/packages" headers = get_header( { "Authorization": f"Bearer {token}", "Content-Type": "application/json", } ) logger.info(f"Get oat package list: {url}\nquery params: {query_params}") response = requests.get(url, headers=headers, params=query_params) logger.info( f"Get oat package list response: {response.text}, " f"response headers: {get_trace_log(response.headers)}" ) if response.status_code == requests.codes.bad_request: response_data = response.json() if ( response_data.get("error", {}).get("innererror", {}).get("code", "") == "OutOfRetentionTime" ): logger.warning( "The OAT file is out of retention time, " f"start_time: {start_time}, end_time: {end_time}" ) return 0, [] if response.status_code in [requests.codes.forbidden, requests.codes.not_found]: logger.error(f"response status code: {response.status_code}") return 0, [] response.raise_for_status() response_data = response.json() package_list = response_data["items"] count = response_data["count"] return count, package_list def download_oat_file( token: str, oat_file_id: str, pipeline_id: str ) -> Optional[BytesIO]: """Download OAT package Args: token (str): account token oat_file_id (str): Identification associated with an OAT event pipeline_id (str): Unique alphanumeric string that identifies a data pipeline. Returns: Optional[BytesIO] """ url = f"{XDR_HOST_URL}/v1.0/preview/ath/oat/dataPipelines/{pipeline_id}/packages/{oat_file_id}" headers = get_header( { "Authorization": f"Bearer {token}", "Content-Type": "application/gzip", } ) logger.info(f"Download oat file url: {url}") response = requests.get(url, headers=headers) logger.info( f"Download oat file response status_code: {response.status_code}, " f"response headers: {get_trace_log(response.headers)}" ) if response.status_code == requests.codes.bad_request: response_data = response.json() if ( response_data.get("error", {}).get("innererror", {}).get("code", "") == "OutOfRetentionTime" ): logger.warning( f"The OAT file is out of retention time, file_id: {oat_file_id}" ) return None if response.status_code in [requests.codes.forbidden, requests.codes.not_found]: logger.error(f"response status code: {response.status_code}") return None response.raise_for_status() bytes_io = BytesIO() with gzip.GzipFile(filename="", fileobj=bytes_io, mode="wb") as fp: bytes_written = fp.write(response.content) bytes_io.seek(0) logger.info(f"Download oat file total bytes: {bytes_written}") return bytes_io def _file_line_generator(file: BytesIO) -> str: try: file.seek(0) with gzip.open(file, mode="rt") as fp: for line in fp: yield line except Exception as e: logger.exception(f"Fail to generate file rows. Exception: {e}") raise def _send_logs_to_log_analytics(logs: List[Dict[str, Any]]) -> None: log_analytics = LogAnalytics(WORKSPACE_ID, WORKSPACE_KEY, OAT_LOG_TYPE) log_analytics.post_data(logs) logger.info(f"Send oat data to Sentinel successfully. count: {len(logs)}") def get_oat_file_processed_lines( table_client: TableClient, file_id: str, clp_id: str ) -> Optional[int]: try: entity = table_client.get_entity(file_id, clp_id) return entity["processed_lines"] except ResourceNotFoundError: return None def update_oat_file_processed_lines( table_client: TableClient, clp_id: str, file_id: str, processed_lines: int ) -> None: logger.info( f"Update oat file processed lines. clp_id:{clp_id}, " f"file_id: {file_id}, {processed_lines} lines." ) table_client.upsert_entity( { "PartitionKey": file_id, "RowKey": clp_id, "processed_lines": processed_lines, }, mode=UpdateMode.MERGE, ) def delete_oat_file_processed_lines( table_client: TableClient, clp_id: str, file_id: str ) -> None: logger.info(f"Delete processed record. clp_id: {clp_id}, file_id: {file_id}.") table_client.delete_entity(file_id, clp_id) def oat_file_handler(clp_id: str, file_id: str, file: BytesIO) -> None: table_service_client = TableServiceClient.from_connection_string( conn_str=STORAGE_CONNECTION_STRING ) table_service_client.create_table_if_not_exists(OAT_PIPELINE_PROCESSED_INFO_TABLE) table_client = TableClient.from_connection_string( conn_str=STORAGE_CONNECTION_STRING, table_name=OAT_PIPELINE_PROCESSED_INFO_TABLE, ) processed_lines = get_oat_file_processed_lines(table_client, file_id, clp_id) if processed_lines: logger.info(f"The OAT gzip has been processed {processed_lines} lines.") oat_logs = [] for row_idx, row in enumerate(_file_line_generator(file)): if processed_lines and row_idx <= processed_lines: continue try: log = orjson.loads(row) except Exception as e: logger.exception(f"Row is not a valid json. {row=}") continue transformed_oat_log = transform_oat_log(clp_id, log) if transformed_oat_log: oat_logs.append(transformed_oat_log) else: logger.info( f"[oat_file_handler] No allowed fields found in the log. clp_id: {clp_id}" ) if len(oat_logs) == OAT_ROWS_BULK_COUNT: _send_logs_to_log_analytics(oat_logs) oat_logs = [] update_oat_file_processed_lines(table_client, clp_id, file_id, row_idx) if oat_logs: _send_logs_to_log_analytics(oat_logs) if row_idx >= OAT_ROWS_BULK_COUNT: delete_oat_file_processed_lines(table_client, clp_id, file_id) def get_oat_pipeline_id(clp_id): try: table_service_client = TableServiceClient.from_connection_string( conn_str=STORAGE_CONNECTION_STRING ) table_service_client.create_table_if_not_exists(REGISTER_STATUS_TABLE) table_client = TableClient.from_connection_string( conn_str=STORAGE_CONNECTION_STRING, table_name=REGISTER_STATUS_TABLE ) entity = table_client.get_entity(clp_id, clp_id) return entity.get("pipelineId", None) except ResourceNotFoundError: return None def update_oat_pipeline_id(clp_id, pipeline_id): table_client = TableClient.from_connection_string( conn_str=STORAGE_CONNECTION_STRING, table_name=REGISTER_STATUS_TABLE ) table_client.upsert_entity( { "PartitionKey": clp_id, "RowKey": clp_id, "pipelineId": pipeline_id, }, mode=UpdateMode.MERGE, ) def get_pipeline_id_for_migration(token: str) -> str: """Get pipeline id from single config api Returns: str: pipelineId """ url = f"{XDR_HOST_URL}/beta/xdr/oat/dataPipeline" headers = get_header( { "Authorization": f"Bearer {token}", "Content-Type": "application/json", } ) logger.info(f"Get oat pipeline id for migration: {url}.") response = requests.get(url, headers=headers) logger.info(f"Get oat pipeline id for migration, response: {response.text}, ") response.raise_for_status() response_data = response.json() return response_data["pipelineId"] def is_registered_by_pipeline_id(token: str, pipeline_id: str) -> bool: """Get customer settings Returns: bool: http code is ok """ url = f"{XDR_HOST_URL}/v1.0/preview/ath/oat/dataPipelines/{pipeline_id}" headers = get_header( { "Authorization": f"Bearer {token}", "Content-Type": "application/json", } ) logger.info(f"Get oat customer setting from multiple config api: {url}.") response = requests.get(url, headers=headers) logger.info(f"Get oat customer setting response: {response.text}, ") if response.status_code == requests.codes.ok: return True else: return False def register_oat_and_get_pipeline_id( token: str, post_data: Dict[str, Any], clp_id: str ) -> str: """Register multiple config OAT Args: token (str): account token post_data (Dict): registration information for OAT clp_id (str): clp id Returns: str: pipeline_id """ url = f"{XDR_HOST_URL}/v1.0/preview/ath/oat/dataPipelines" headers = get_header( { "Authorization": f"Bearer {token}", "Content-Type": "application/json", } ) logger.info(f"Register oat from multiple config api: {url} \ndata: {post_data}") response = requests.post(url, headers=headers, json=post_data) logger.info( f"Register oat from multiple config api response: {response.text}, header: {response.headers} " ) response.raise_for_status() # pipeline id is gotten from Location of response header that is the string behind the last slash location = response.headers.get("Location").split("/") pipeline_id = location[-1] if not pipeline_id: raise Exception(f"Pipeline id of customer [{clp_id}] is invalid.") logger.info( f"Customer [{clp_id}] registers to OAT successfully. pipeline id: {pipeline_id}" ) return pipeline_id